"""
Plugin for uploading output files to S3 "progressively," meaning to upload each task's output files
immediately upon task completion, instead of waiting for the whole workflow to finish. (The latter
technique, which doesn't need a plugin at all, is illustrated in ../upload_output_files.sh)

To enable, install this plugin (`pip3 install .` & confirm listed by `miniwdl --version`) and set
the environment variable MINIWDL__S3_PROGRESSIVE_UPLOAD__FOLDER to a S3 folder URI under which to
store the output files, e.g.: s3://my_bucket/wdl_output

The files will be uploaded into a subfolder structure reflecting the miniwdl run directory tree for
task and subworkflow calls. Usually this includes the timestamp-prefixed, top-level run directory
name. If MINIWDL__S3_PROGRESSIVE_UPLOAD__FOLDER ends in / then this top-level name component won't
be included. That makes the S3 paths more predictable, but then more care is needed to prevent
different runs from overwriting each others' outputs.

Shells out to the AWS CLI, which must be pre-configured so that "aws s3 cp ..." into the specified
bucket works (without explicit auth-related arguments).

Deposits into each successful task/workflow run directory and S3 folder, an additional file
outputs.s3.json which copies outputs.json replacing local file paths with the uploaded S3 URIs.
(The JSON printed to miniwdl standard output keeps local paths.)

Limitations:
1) All task output files are uploaded, even ones that aren't top-level workflow outputs. (We can't,
   at the moment of task completion, necessarily predict which files the calling workflow will
   finally output.)
2) Doesn't upload (or rewrite outputs JSON for) workflow output files that weren't generated by a
   task, e.g. outputting an input file, or a file generated by write_lines() etc. in the workflow.
   (We could handle such stragglers by uploading them at workflow completion; it just hasn't been
   needed yet.)
3) Doesn't yet handle WDL Directory outputs.
"""

import os
import subprocess
import threading
import json
import WDL
from WDL._util import StructuredLogMessage as _, write_values_json, write_atomic

# Table of the S3 URIs of already-uploaded files, keyed by local inode (to look up from any local
# symlink or hardlink)
_uploaded_files = {}
_uploaded_files_lock = threading.Lock()


def task(cfg, logger, call_id_stack, run_dir, task, **recv):
    """
    on completion of any task, upload its output files to S3, and record the S3 URI corresponding
    to each local file (keyed by inode) in _uploaded_files
    """
    logger = logger.getChild("s3_progressive_upload")

    s3prefix = formulate_s3prefix(cfg, run_dir, call_id_stack)

    # ignore inputs
    recv = yield recv
    # ignore command/runtime/container
    try:
        recv = yield recv
    finally:
        if s3prefix:
            # upload stdout & stderr (even if the task failed)
            for log_fn in ("stderr.txt", "stdout.txt"):
                log_afn = os.path.join(run_dir, log_fn)
                if os.path.isfile(log_afn):
                    s3cp(logger, log_afn, os.path.join(s3prefix, log_fn))

    if s3prefix:
        # for each file under out/
        def _raise(ex):
            raise ex

        links_dir = os.path.join(run_dir, "out")
        for (dn, subdirs, files) in os.walk(links_dir, onerror=_raise):
            assert dn == links_dir or dn.startswith(links_dir + "/")
            for fn in files:
                if fn == ".WDL_Directory":
                    continue  # TODO: handle Directory output
                # upload file to S3
                s3uri = os.path.join(s3prefix, dn[(len(links_dir) + 1) :], fn)
                s3cp(logger, os.path.join(dn, fn), s3uri)

        # write outputs.s3.json using _uploaded_files
        write_outputs_s3_json(logger, recv["outputs"], run_dir, s3prefix, task.name)

    yield recv


def workflow(cfg, logger, call_id_stack, run_dir, workflow, **recv):
    """
    on workflow completion, add a file outputs.s3.json to the run directory, which is outputs.json
    with local filenames rewritten to the uploaded S3 URIs (as previously recorded on completion of
    each task).

    NOTE: this routine is also invoked for subworkflow calls within the main workflow
    """
    logger = logger.getChild("s3_progressive_upload")

    # ignore inputs
    recv = yield recv

    s3prefix = formulate_s3prefix(cfg, run_dir, call_id_stack)
    if s3prefix:
        # write outputs.s3.json using _uploaded_files
        write_outputs_s3_json(
            logger,
            recv["outputs"],
            run_dir,
            s3prefix,
            workflow.name,
        )

    yield recv


def formulate_s3prefix(cfg, run_dir, call_id_stack):
    if not cfg.has_option("s3_progressive_upload", "folder") or call_id_stack[-1].startswith(
        "download-"
    ):
        return None

    # formulate the destination s3 subfolder path for this run/call/subworkflow
    s3prefix = cfg["s3_progressive_upload"]["folder"]
    assert s3prefix.startswith("s3://"), "MINIWDL__S3_PROGRESSIVE_UPLOAD__FOLDER invalid"

    # Remove "call-" prefixes from the ID stack that will determine our S3 subfolder structure.
    # These prefixes are needed locally to ensure the call subdirectory names cannot collide with
    # other stuff miniwdl creates in the local run directory (for example, a call named "out").
    # But the clutter isn't need in our S3 keys, since we selectively control what we put there.
    call_id_abbrev = [(s[5:] if s.startswith("call-") else s) for s in call_id_stack][1:]

    if not s3prefix.endswith("/"):
        # include top-level run directory name component
        top_run_dir = run_dir
        for i in range(len(call_id_abbrev)):
            top_run_dir = os.path.split(top_run_dir)[0]
        call_id_abbrev.insert(0, os.path.basename(top_run_dir))
    else:
        s3prefix = s3prefix[:-1]

    return os.path.join(s3prefix, *call_id_abbrev)


def write_outputs_s3_json(logger, outputs, run_dir, s3prefix, namespace):
    # rewrite uploaded files to their S3 URIs
    def rewriter(v):
        fn = v.value  # TODO: handle Directory
        try:
            return _uploaded_files[inode(fn)]
        except:
            logger.warning(
                _(
                    "output file wasn't uploaded to S3; keeping local path in outputs.s3.json",
                    file=fn,
                )
            )
            return fn

    with _uploaded_files_lock:
        outputs_s3 = WDL.Value.rewrite_env_paths(outputs, rewriter)

    # write the modified outputs to outputs.s3.json
    outputs_fn = os.path.join(run_dir, "outputs.s3.json")
    write_values_json(outputs_s3, outputs_fn, namespace=namespace)
    # and upload
    s3cp(logger, outputs_fn, os.path.join(s3prefix, "outputs.s3.json"))

    # also drop a local file with s3prefix. since we can't control miniwdl stdout from this plugin,
    # it might otherwise be tricky for the caller to figure this out.
    write_atomic(s3prefix, os.path.join(run_dir, "outputs.s3folder.txt"))


def s3cp(logger, fn, s3uri):
    # elide already-uploaded files
    with _uploaded_files_lock:
        if inode(fn) in _uploaded_files:
            return
    # shell out to `aws s3 cp` instead of calling boto3 directly, to minimize contention added to
    # miniwdl's GIL
    cmd = ["aws", "s3", "cp", fn, s3uri, "--follow-symlinks", "--only-show-errors"]
    logger.debug(" ".join(cmd))
    rslt = subprocess.run(cmd, stderr=subprocess.PIPE)
    if rslt.returncode != 0:
        logger.error(
            _(
                "failed uploading output file",
                cmd=" ".join(cmd),
                exit_status=rslt.returncode,
                stderr=rslt.stderr.decode("utf-8"),
            )
        )
        raise WDL.Error.RuntimeError("failed: " + " ".join(cmd))
    # record in _uploaded_files
    with _uploaded_files_lock:
        _uploaded_files[inode(fn)] = s3uri
    logger.info(_("uploaded", file=fn, uri=s3uri))


def inode(link):
    st = os.stat(os.path.realpath(link))
    return (st.st_dev, st.st_ino)
